/*
 * Copyright (c) 2024, Alibaba Cloud;
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.aliyun.dataworks.migrationx.transformer.core.spark.command;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Redirects lines read from a given input stream to a j.u.l.Logger (at INFO level).
 */
class OutputRedirector {

    private final BufferedReader reader;
    private final Logger sink;
    private final Thread thread;
    private final ChildProcAppHandle callback;

    private volatile boolean active;
    private volatile Throwable error;

    OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
        this(in, loggerName, tf, null);
    }

    OutputRedirector(
        InputStream in,
        String loggerName,
        ThreadFactory tf,
        ChildProcAppHandle callback) {
        this.active = true;
        this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
        this.thread = tf.newThread(this::redirect);
        this.sink = Logger.getLogger(loggerName);
        this.callback = callback;
        thread.start();
    }

    private void redirect() {
        try {
            String line;
            while ((line = reader.readLine()) != null) {
                if (active) {
                    sink.info(line.replaceFirst("\\s*$", ""));
                    if ((containsIgnoreCase(line, "Error") || containsIgnoreCase(line, "Exception")) &&
                        !line.contains("at ")) {
                        error = new RuntimeException(line);
                    }
                }
            }
        } catch (IOException e) {
            sink.log(Level.FINE, "Error reading child process output.", e);
        } finally {
            if (callback != null) {
                callback.monitorChild();
            }
        }
    }

    /**
     * This method just stops the output of the process from showing up in the local logs. The child's output will still
     * be read (and, thus, the redirect thread will still be alive) to avoid the child process hanging because of lack
     * of output buffer.
     */
    void stop() {
        active = false;
    }

    boolean isAlive() {
        return thread.isAlive();
    }

    Throwable getError() {
        return error;
    }

    /**
     * Copied from Apache Commons Lang {@code StringUtils#containsIgnoreCase(String, String)}
     */
    private static boolean containsIgnoreCase(String str, String searchStr) {
        if (str == null || searchStr == null) {
            return false;
        }
        int len = searchStr.length();
        int max = str.length() - len;
        for (int i = 0; i <= max; i++) {
            if (str.regionMatches(true, i, searchStr, 0, len)) {
                return true;
            }
        }
        return false;
    }
}
